/*
 * Copyright (c) 2005 - 2010 CAS Dev Team
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 * 1. Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 * 2. Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 * 4. Neither the name of the CAS Dev. Team nor the names of its contributors
 *    may be used to endorse or promote products derived from this software
 *    without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND
 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
 * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE
 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 * SUCH DAMAGE.
 *
 *      ASConsistentHash.hpp
 *
 * $CAS$
 */

#include <CTPP2Types.h>

#include <STLString.hpp>
#include <STLVector.hpp>
#include <STLException.hpp>
#ifdef __linux__ 
#include <algorithm>
#endif

namespace CAS_MOD
{

/**
  @class ConsistentHash ASConsistentHash.hpp <ASConsistentHash.hpp>
  @brief Consistent hash
*/
template <typename T>class ConsistentHash
{
public:
	/**
	  @brief Constructor
	  @param iIMaxContinuumElements - max. number of elements in continuum
	*/
	ConsistentHash(const UINT_32 iIMaxContinuumElements = 100);

	/**
	  @brief Add element to hash
	  @param sElementName - key name
	  @param oData - data to add
	  @param dWeight - element weight
	  @return Element Id
	*/
	INT_32 AddElement(const STLW::string  & sElementName,
	                  const T             & oData,
	                  const W_FLOAT         dWeight);

	/**
	  @brief Remove Element by given name
	  @param sElementName - element name
	  @return 0 - if success, -1 - otherwise
	*/
	INT_32 RemoveElementByName(const STLW::string  & sElementName);

	/**
	  @brief Get Element by key
	  @param sKey - key name
	  @return Nearest element
	*/
	T & GetElementByKey(const STLW::string   & sKey);

	/**
	  @brief Get Element by Id
	  @param iId - element id
	  @return Element
	*/
	T & GetElementById(const INT_32 iId);

	/**
	  @brief Remove Element by Id
	  @param iId - element id
	  @return Element
	*/
	INT_32 RemoveElementById(const INT_32 iId);

	/**
	  @brief Enable element by specified Id
	  @param iId - element id
	  @return 0 - if success, -1 - otherwise
	*/
	INT_32 EnableElementById(const INT_32 iId);

	/**
	  @brief Disable element by specified Id
	  @param iId - element id
	  @return 0 - if success, -1 - otherwise
	*/
	INT_32 DisableElementById(const INT_32 iId);

	/**
	  @brief Get number of elements
	*/
	UINT_32 Size() { return vData.size(); }

	/**
	  @brif A destructor
	*/
	~ConsistentHash() throw();

private:
	/**
	  @brief Data Element storage
	*/
	struct DataElement
	{
		/** Key            */
		STLW::string      key;
		/** Data           */
		T                 data;
		/** Element weight */
		W_FLOAT           weight;
		/** Flag */
		UINT_32           enabled;
	};

	/**
	  @brief Continuum element
	*/
	struct ContinuumElement
	{
		/** Element position */
		UINT_32             pos;
		/** Element hash     */
		UINT_32             hash;
	};

	/**
	  @brief Sorting functor
	*/
	struct ContinuumElementHashSort
	{
		bool operator()(const ContinuumElement  & oX,
		                const ContinuumElement  & oY) const
		{
			return oX.hash < oY.hash;
		}
	};

	/** Max number of elements in continuum  */
	const UINT_32 iMaxContinuumElements;
	/** Max weight of elements in continuum  */
	W_FLOAT       dMaxWeight;

	/** Elements                             */
	STLW::vector<DataElement>       vData;
	/** Continuum                            */
	STLW::vector<ContinuumElement>  vContinuum;

	/**
	  @brief Rebuild continuum
	*/
	void RebuildContinuum();

	/**
	  @brief Hash function
	  @param iHash - hash seed
	  @param szKey - key
	  @param iKeyLen - key length
	  @return hash value
	*/
	static UINT_32 HashFunc(const UINT_32 iHash, CCHAR_P szKey, const UINT_32 iKeyLen);

	/** CRC32 data  */
	static UINT_32 aCRC32[256];
};

template <typename T> UINT_32 ConsistentHash<T>::aCRC32[256] =
{
	0x00000000U, 0x77073096U, 0xEE0E612CU, 0x990951BAU, 0x076DC419U, 0x706AF48FU, 0xE963A535U, 0x9E6495A3U,
	0x0EDB8832U, 0x79DCB8A4U, 0xE0D5E91EU, 0x97D2D988U, 0x09B64C2BU, 0x7EB17CBDU, 0xE7B82D07U, 0x90BF1D91U,
	0x1DB71064U, 0x6AB020F2U, 0xF3B97148U, 0x84BE41DEU, 0x1ADAD47DU, 0x6DDDE4EBU, 0xF4D4B551U, 0x83D385C7U,
	0x136C9856U, 0x646BA8C0U, 0xFD62F97AU, 0x8A65C9ECU, 0x14015C4FU, 0x63066CD9U, 0xFA0F3D63U, 0x8D080DF5U,
	0x3B6E20C8U, 0x4C69105EU, 0xD56041E4U, 0xA2677172U, 0x3C03E4D1U, 0x4B04D447U, 0xD20D85FDU, 0xA50AB56BU,
	0x35B5A8FAU, 0x42B2986CU, 0xDBBBC9D6U, 0xACBCF940U, 0x32D86CE3U, 0x45DF5C75U, 0xDCD60DCFU, 0xABD13D59U,
	0x26D930ACU, 0x51DE003AU, 0xC8D75180U, 0xBFD06116U, 0x21B4F4B5U, 0x56B3C423U, 0xCFBA9599U, 0xB8BDA50FU,
	0x2802B89EU, 0x5F058808U, 0xC60CD9B2U, 0xB10BE924U, 0x2F6F7C87U, 0x58684C11U, 0xC1611DABU, 0xB6662D3DU,
	0x76DC4190U, 0x01DB7106U, 0x98D220BCU, 0xEFD5102AU, 0x71B18589U, 0x06B6B51FU, 0x9FBFE4A5U, 0xE8B8D433U,
	0x7807C9A2U, 0x0F00F934U, 0x9609A88EU, 0xE10E9818U, 0x7F6A0DBBU, 0x086D3D2DU, 0x91646C97U, 0xE6635C01U,
	0x6B6B51F4U, 0x1C6C6162U, 0x856530D8U, 0xF262004EU, 0x6C0695EDU, 0x1B01A57BU, 0x8208F4C1U, 0xF50FC457U,
	0x65B0D9C6U, 0x12B7E950U, 0x8BBEB8EAU, 0xFCB9887CU, 0x62DD1DDFU, 0x15DA2D49U, 0x8CD37CF3U, 0xFBD44C65U,
	0x4DB26158U, 0x3AB551CEU, 0xA3BC0074U, 0xD4BB30E2U, 0x4ADFA541U, 0x3DD895D7U, 0xA4D1C46DU, 0xD3D6F4FBU,
	0x4369E96AU, 0x346ED9FCU, 0xAD678846U, 0xDA60B8D0U, 0x44042D73U, 0x33031DE5U, 0xAA0A4C5FU, 0xDD0D7CC9U,
	0x5005713CU, 0x270241AAU, 0xBE0B1010U, 0xC90C2086U, 0x5768B525U, 0x206F85B3U, 0xB966D409U, 0xCE61E49FU,
	0x5EDEF90EU, 0x29D9C998U, 0xB0D09822U, 0xC7D7A8B4U, 0x59B33D17U, 0x2EB40D81U, 0xB7BD5C3BU, 0xC0BA6CADU,
	0xEDB88320U, 0x9ABFB3B6U, 0x03B6E20CU, 0x74B1D29AU, 0xEAD54739U, 0x9DD277AFU, 0x04DB2615U, 0x73DC1683U,
	0xE3630B12U, 0x94643B84U, 0x0D6D6A3EU, 0x7A6A5AA8U, 0xE40ECF0BU, 0x9309FF9DU, 0x0A00AE27U, 0x7D079EB1U,
	0xF00F9344U, 0x8708A3D2U, 0x1E01F268U, 0x6906C2FEU, 0xF762575DU, 0x806567CBU, 0x196C3671U, 0x6E6B06E7U,
	0xFED41B76U, 0x89D32BE0U, 0x10DA7A5AU, 0x67DD4ACCU, 0xF9B9DF6FU, 0x8EBEEFF9U, 0x17B7BE43U, 0x60B08ED5U,
	0xD6D6A3E8U, 0xA1D1937EU, 0x38D8C2C4U, 0x4FDFF252U, 0xD1BB67F1U, 0xA6BC5767U, 0x3FB506DDU, 0x48B2364BU,
	0xD80D2BDAU, 0xAF0A1B4CU, 0x36034AF6U, 0x41047A60U, 0xDF60EFC3U, 0xA867DF55U, 0x316E8EEFU, 0x4669BE79U,
	0xCB61B38CU, 0xBC66831AU, 0x256FD2A0U, 0x5268E236U, 0xCC0C7795U, 0xBB0B4703U, 0x220216B9U, 0x5505262FU,
	0xC5BA3BBEU, 0xB2BD0B28U, 0x2BB45A92U, 0x5CB36A04U, 0xC2D7FFA7U, 0xB5D0CF31U, 0x2CD99E8BU, 0x5BDEAE1DU,
	0x9B64C2B0U, 0xEC63F226U, 0x756AA39CU, 0x026D930AU, 0x9C0906A9U, 0xEB0E363FU, 0x72076785U, 0x05005713U,
	0x95BF4A82U, 0xE2B87A14U, 0x7BB12BAEU, 0x0CB61B38U, 0x92D28E9BU, 0xE5D5BE0DU, 0x7CDCEFB7U, 0x0BDBDF21U,
	0x86D3D2D4U, 0xF1D4E242U, 0x68DDB3F8U, 0x1FDA836EU, 0x81BE16CDU, 0xF6B9265BU, 0x6FB077E1U, 0x18B74777U,
	0x88085AE6U, 0xFF0F6A70U, 0x66063BCAU, 0x11010B5CU, 0x8F659EFFU, 0xF862AE69U, 0x616BFFD3U, 0x166CCF45U,
	0xA00AE278U, 0xD70DD2EEU, 0x4E048354U, 0x3903B3C2U, 0xA7672661U, 0xD06016F7U, 0x4969474DU, 0x3E6E77DBU,
	0xAED16A4AU, 0xD9D65ADCU, 0x40DF0B66U, 0x37D83BF0U, 0xA9BCAE53U, 0xDEBB9EC5U, 0x47B2CF7FU, 0x30B5FFE9U,
	0xBDBDF21CU, 0xCABAC28AU, 0x53B39330U, 0x24B4A3A6U, 0xBAD03605U, 0xCDD70693U, 0x54DE5729U, 0x23D967BFU,
	0xB3667A2EU, 0xC4614AB8U, 0x5D681B02U, 0x2A6F2B94U, 0xB40BBE37U, 0xC30C8EA1U, 0x5A05DF1BU, 0x2D02EF8DU
};

//
// Constructor
//
template <typename T> ConsistentHash<T>::ConsistentHash(const UINT_32 iIMaxContinuumElements): iMaxContinuumElements(iIMaxContinuumElements), dMaxWeight(0)
{
	;;
}

//
// Add element
//
template <typename T> INT_32 ConsistentHash<T>::AddElement(const STLW::string  & sKey,
                                                           const T             & oData,
                                                           const W_FLOAT         dWeight)
{
	if (dWeight <= 0) { return -1; }

	dMaxWeight += dWeight;

	DataElement oDataElement;
	oDataElement.key     = sKey;
	oDataElement.data    = oData;
	oDataElement.weight  = dWeight;
	oDataElement.enabled = 0;

	// Store data
	vData.push_back(oDataElement);
#ifdef DEBUG
	fprintf(stderr, "AddElement(`%s`, %f), Id = %d, max. weight = %f\n", sKey.c_str(), dWeight, INT_32(vData.size() - 1), dMaxWeight);
#endif
	RebuildContinuum();

// Current element is last element
return vData.size() - 1;
}

//
// Get element by specified key
//
template <typename T> T & ConsistentHash<T>::GetElementByKey(const STLW::string   & sKey)
{
	if (vContinuum.size() == 0) { throw STLW::logic_error("No elements in continuum"); }

	UINT_32 iHash = HashFunc(0, sKey.data(), sKey.size());

	INT_32 iLeft  = 0;
	INT_32 iMid   = 0;
	INT_32 iRight = vContinuum.size() - 1;

	// Border conditions
	if (iHash > vContinuum[iRight].hash || iHash < vContinuum[iLeft].hash)
	{
		for (UINT_32 iPos = 0; iPos < iRight; ++iPos)
		{
			const UINT_32 iElementPos = vContinuum[iPos].pos;
			DataElement & oElement = vData[iElementPos];

			// Get first enabled element
			if (oElement.enabled != -1)
			{
#ifdef DEBUG
	fprintf(stderr, "hash %u, nearest enabled: %d (id %u) hash %u\n\n", iHash, iPos, iElementPos, vContinuum[iPos].hash);
#endif
				return oElement.data;
			}
		}
		throw STLW::logic_error("No active elements in continuum");
	}

	INT_32 iRDelta     = iHash;
	INT_32 iRMid       = 0;

	// Do a binary search
	while (iLeft <= iRight)
	{
		iMid = (iLeft + iRight) / 2;

		const INT_32 iDelta = vContinuum[iMid].hash - iHash;

		if (iDelta > 0) { iRight = iMid - 1; }
		else            { iLeft  = iMid + 1; }

		const UINT_32 iDeltaAbs = (iDelta < 0) ? -iDelta : iDelta;
		if (iRDelta > iDeltaAbs)
		{
			iRMid       = iMid;
			iRDelta     = iDeltaAbs;
		}
	}

	iRight = vContinuum.size() - 1;

	UINT_32 iPos = 0;
	for(;;)
	{
		INT_32  iCurrPosL = iRMid - iPos;
		UINT_32 iCurrPosR = iRMid + iPos + 1;

		bool bBreakCycle = true;
		if (iCurrPosL > 0)
		{
			const UINT_32 iElementPos = vContinuum[iCurrPosL].pos;
			DataElement & oElement = vData[iElementPos];
			// Get first enabled element
			if (oElement.enabled != -1)
			{
#ifdef DEBUG
	fprintf(stderr, "hash %u, nearest enabled(L): %d (id %u) hash %u\n\n", iHash, iCurrPosL, iElementPos, vContinuum[iCurrPosL].hash);
#endif
				return oElement.data;
			}
			bBreakCycle = false;
		}

		if (iCurrPosR <= iRight)
		{
			const UINT_32 iElementPos = vContinuum[iCurrPosR].pos;
			DataElement & oElement = vData[iElementPos];
			// Get first enabled element
			if (oElement.enabled != -1)
			{
#ifdef DEBUG
	fprintf(stderr, "hash %u, nearest enabled(L): %d (id %u) hash %u\n\n", iHash, iCurrPosR, iElementPos, vContinuum[iCurrPosR].hash);
#endif
				return oElement.data;
			}
			bBreakCycle = false;
		}

		if (bBreakCycle) { break; }

		++iPos;
	}

	throw STLW::logic_error("No active elements in continuum");

// This should *never* happened, it's need only to remove compiler warning
return vData[0].data;
}

//
// Get element by specified Id
//
template <typename T> T & ConsistentHash<T>::GetElementById(const INT_32 iId)
{
	return vData.at(iId).data;
}

//
// Remove element by specified Id
//
template <typename T> INT_32 ConsistentHash<T>::RemoveElementById(const INT_32 iId)
{
	STLW::vector<DataElement>       vTMPData;

	const UINT_32 iElements = vData.size();

	if (iId >= iElements) { return -1; }

	UINT_32 iPos = 0;
	for(; iPos < iId; ++iPos) { vTMPData.push_back(vData[iPos]); }

	++iPos;
	for(; iPos < iElements; ++iPos) { vTMPData.push_back(vData[iPos]); }

	dMaxWeight -= vData[iId].weight;

	vData.swap(vTMPData);

	RebuildContinuum();

return 0;
}

//
// Remove element by specified Id
//
template <typename T> INT_32 ConsistentHash<T>::RemoveElementByName(const STLW::string  & sElementName)
{
	const UINT_32 iElements = vData.size();
	for(UINT_32 iPos = 0; iPos < iElements; ++iPos)
	{
		if (vData[iPos].key == sElementName)
		{
			RemoveElementById(iPos);
			return 0;
		}
	}
return -1;
}

//
// Enable element by specified Id
//
template <typename T> INT_32 ConsistentHash<T>::EnableElementById(const INT_32 iId)
{
	const UINT_32 iElements = vData.size();

	if (iId >= iElements) { return -1; }

	vData[iId].enabled = 0;

#ifdef DEBUG
	fprintf(stderr, "Continuum:");
	for (UINT_32 iContinuumPos = 0; iContinuumPos < vContinuum.size(); ++iContinuumPos)
	{
		fprintf(stderr, " (%u %u %c)", vContinuum[iContinuumPos].pos, vContinuum[iContinuumPos].hash, vData[vContinuum[iContinuumPos].pos].enabled == 0 ? 'E':'D');
	}
	fprintf(stderr, "\n");
#endif

return 0;
}

//
// Disable element by specified Id
//
template <typename T> INT_32 ConsistentHash<T>::DisableElementById(const INT_32 iId)
{
	const UINT_32 iElements = vData.size();

	if (iId >= iElements) { return -1; }

	vData[iId].enabled = -1;

#ifdef DEBUG
	fprintf(stderr, "Continuum:");
	for (UINT_32 iContinuumPos = 0; iContinuumPos < vContinuum.size(); ++iContinuumPos)
	{
		fprintf(stderr, " (%u %u %c)", vContinuum[iContinuumPos].pos, vContinuum[iContinuumPos].hash, vData[vContinuum[iContinuumPos].pos].enabled == 0 ? 'E':'D');
	}
	fprintf(stderr, "\n");
#endif

return 0;
}

//
// Rebuild continuum
//
template <typename T> void ConsistentHash<T>::RebuildContinuum()
{
	STLW::vector<ContinuumElement> vTMPContinuum;

	// Re-calculate number of continuum elements
	const UINT_32 iElements = vData.size();
	for (UINT_32 iPos = 0; iPos < iElements; ++iPos)
	{
		const DataElement & oCurrentData = vData[iPos];
//fprintf(stderr, "iContinuumElemens = %u dWeight = %f dMaxWeight = %f, sKey = `%s`\n", iContinuumElemens, dWeight, dMaxWeight, oCurrentData.key.data());

		UINT_32 iHash = 0;
		const UINT_32 iContinuumElemens = iMaxContinuumElements * (oCurrentData.weight / dMaxWeight);
		for (UINT_32 iContinuumPos = 0; iContinuumPos < iContinuumElemens; ++iContinuumPos)
		{
			iHash = HashFunc(iHash, oCurrentData.key.data(), oCurrentData.key.size());
//fprintf(stderr, "iHash = %u\n", iHash);

			ContinuumElement oContinuumElement;
			oContinuumElement.pos  = iPos;
			oContinuumElement.hash = iHash;

			vTMPContinuum.push_back(oContinuumElement);
		}
	}
	// Sort data
	STLW::sort(vTMPContinuum.begin(), vTMPContinuum.end(), ContinuumElementHashSort());

	// Need mutex here!
	vContinuum.swap(vTMPContinuum);

#ifdef DEBUG
	fprintf(stderr, "Continuum:");
	for (UINT_32 iContinuumPos = 0; iContinuumPos < vContinuum.size(); ++iContinuumPos)
	{
		fprintf(stderr, " (%u %u %c)", vContinuum[iContinuumPos].pos, vContinuum[iContinuumPos].hash, vData[vContinuum[iContinuumPos].pos].enabled == 0 ? 'E':'D');
	}
	fprintf(stderr, "\n");
#endif
}

//
// Calculate hash sum
//
template <typename T> UINT_32 ConsistentHash<T>::HashFunc(const UINT_32 iHash, CCHAR_P szKey, const UINT_32 iKeyLen)
{
	CCHAR_P szEnd = szKey + iKeyLen;

	UINT_32 iNewHash = ~iHash;
	while (szKey < szEnd)
	{
		UINT_32 iIndex = (iNewHash ^ UCHAR_8(*szKey)) & 0x000000FFU;
		iNewHash = (iNewHash >> 8) ^ aCRC32[iIndex];
		++szKey;
	}

return (~iNewHash);
}

//
// Destructor
//
template <typename T> ConsistentHash<T>::~ConsistentHash() throw()
{
	;;
}

} // namespace CAS_MOD
// End.

